package org.databandtech.mysql2elasticsearch;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.databandtech.mysql2elasticsearch.source.SourceFromMySQLByTuple5;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

/**
 * 结果可查询http://localhost:5601/
 * GET /my_videos/_search 
 *
 */
public class SourceByJDBCSinkToElasticSearch {	

	//数据库相关
	final static String URL = "jdbc:mysql://127.0.0.1:3307/databand?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	final static String USER = "root";
	final static String PASS = "mysql";
	final static String SQL = "SELECT title,status,vid,cover_id,url from b_video ORDER BY id DESC LIMIT 100";
	final static String[] COLUMNS_READ = new String[] {"title","status","vid","cover_id","url"};

	//es 配置
	final static int BULK_FLUSH_MAX_ACTIONS = 50; //每一个批次处理的数量
	final static int STREAM_SINK_PARALLELISM = 4;//并行线程
	final static String ES_HOSTNAME = "127.0.0.1";
	final static int ES_PORT = 9200;
	final static String ES_SCHEME = "http";
	final static String INDEX = "my_videos";
	
	public static void main(String[] args) {
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));
        
        //######### 数据读取 #########
        //取2个字段
        //DataStreamSource<Tuple2<String,Integer>> userDataStreamSource = env.addSource(
        //new SourceFromMySQL(url,user,pass,columnsRead,sqlRead));

        //取5个字段,10万条数据灌入大概5秒
        DataStreamSource<Tuple5<String,Integer,String,String,String>> userDataStreamSource = env.addSource(
				new SourceFromMySQLByTuple5(URL,USER,PASS,COLUMNS_READ,SQL));
		
        userDataStreamSource.print();
       
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        httpHosts.add(new HttpHost(ES_HOSTNAME, ES_PORT, ES_SCHEME));
		//######### ES写入  #########
		//第一种方式，不用转换的直接灌数据
		addSinkByDataStreamSourceByTuple5(httpHosts,BULK_FLUSH_MAX_ACTIONS,userDataStreamSource,INDEX);
		//第二种方式，需要转换后灌数据
		//map 規則是30歲以下的獎金 1000，30-60嵗獎金2000，其餘的3000
		//SingleOutputStreamOperator<Tuple3<String,Integer,BigDecimal>> dataWriteStream = userDataStreamSource.map(new MapTransformation());
		//dataWriteStream.print();
		//ESSinkUtil.addSink(httpHosts, BULK_FLUSH_MAX_ACTIONS, sinkParallelism, dataSingleOutputStream,new ESSinkFunction("a"));
		
		try {
			env.execute("ok-es");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void addSinkByDataStreamSourceByTuple5(List<HttpHost> httpHosts, int bulkFlushMaxActions,
			DataStreamSource<Tuple5<String, Integer, String, String, String>> userDataStreamSource, String index) {

		ElasticsearchSink.Builder<Tuple5<String, Integer, String, String, String>> esSinkBuilder = 
				new ElasticsearchSink.Builder<Tuple5<String, Integer, String, String, String>>(
			    httpHosts,
			    new ElasticsearchSinkFunction<Tuple5<String, Integer, String, String, String>>() {

					private static final long serialVersionUID = 1L;

					public IndexRequest createIndexRequest(Tuple5<String, Integer, String, String, String> element) {
			            Map<String, Object> json = new HashMap<String, Object>();
			            int index = 0;
			            for (String column : COLUMNS_READ) {
			            	json.put(column, element.getField(index));
			            	index++;
			            }
			            return Requests.indexRequest()
			                    .index(INDEX)
			                    .source(json);
			        }

			        public void process(Tuple5<String, Integer, String, String, String> element, RuntimeContext ctx, RequestIndexer indexer) {
			            indexer.add(createIndexRequest(element));
			        }
			    }
			);
		esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
		userDataStreamSource.addSink(esSinkBuilder.build());
	}
	

}
